C#使用UDP实现可靠的传输传输(数据包的分组发送)

您所在的位置:网站首页 udp 发送大数据块 C#使用UDP实现可靠的传输传输(数据包的分组发送)

C#使用UDP实现可靠的传输传输(数据包的分组发送)

2024-07-12 11:15| 来源: 网络整理| 查看: 265

背景

在做C#中面向无连接的传输时用到了UDP,虽然UDP协议没有TCP协议稳定可靠,但是在传输效率上要高些,优势也有,缺点也有就是有的时候要丢包,有的时候不得不用UDP,但是如何才能比较稳定的实现可靠传输呢,这是一个问题!

TCP传输数据的时候没有大小限制,但是UDP传输的时候是有大小限制的,我们怎么才能够实现大数据的稳定传输呢。我们想到了,把数据包分包,把一个大数据分割为一系列的小数据包然后分开发送,然后服务端收到了就拼凑起完整数据,如果遇到中途丢包就重发。

实现

UDP线程类,实现数据的分包发送和重发。具体的接收操作需要实现其中的事件

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using Model; using System.Net; using Tool; using System.Threading; namespace ZZUdp.Core { //udp的类 public class UDPThread { #region 私有变量 UdpClient client;//UDP客户端 List sendlist;// 用于轮询是否发送成功的记录 Dictionary RecListDic = new Dictionary();//数据接收列表,每一个sequence对应一个 IPEndPoint remotIpEnd = null;//用来在接收数据的时候对远程主机的信息存放 int port=6666;//定义服务器的端口号 #endregion #region 属性 public int CheckQueueTimeInterval { get; set; }//检查发送队列间隔 public int MaxResendTimes { get; set; }//没有收到确认包时,最大重新发送的数目,超过此数目会丢弃并触发PackageSendFailture事件 #endregion #region 事件 /// /// 当数据包收到时触发 /// public event EventHandler PackageReceived; /// /// 当数据包收到事件触发时,被调用 /// /// 包含事件的参数 protected virtual void OnPackageReceived(PackageEventArgs e) { if (PackageReceived != null) PackageReceived(this, e); } /// /// 数据包发送失败 /// public event EventHandler PackageSendFailure; /// /// 当数据发送失败时调用 /// /// 包含事件的参数 protected virtual void OnPackageSendFailure(PackageEventArgs e) { if (PackageSendFailure != null) PackageSendFailure(this, e); } /// /// 数据包未接收到确认,重新发送 /// public event EventHandler PackageResend; /// /// 触发重新发送事件 /// /// 包含事件的参数 protected virtual void OnPackageResend(PackageEventArgs e) { if (PackageResend != null) PackageResend(this, e); } #endregion //无参构造函数 public UDPThread() { } //构造函数 public UDPThread(string ipaddress, int port) { IPAddress ipA = IPAddress.Parse(ipaddress);//构造远程连接的参数 IPEndPoint ipEnd = new IPEndPoint(ipA, port); client = new UdpClient();// client = new UdpClient(ipEnd)这样的话就没有创建远程连接 client.Connect(ipEnd);//使用指定的远程主机信息建立默认远程主机连接 sendlist = new List(); CheckQueueTimeInterval = 2000;//轮询间隔时间 MaxResendTimes = 5;//最大发送次数 new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();//启动轮询线程 //开始监听数据 AsyncReceiveData(); } /// /// 同步数据接收方法 /// public void ReceiveData() { while (true) { IPEndPoint retip = null; UdpPacket udpp = null; try { byte[] data = client.Receive(ref retip);//接收数据,当Client端连接主机的时候,retip就变成Cilent端的IP了 udpp = (UdpPacket)SerializationUnit.DeserializeObject(data); } catch (Exception ex) { //异常处理操作 } if (udpp != null) { PackageEventArgs arg = new PackageEventArgs(udpp, retip); OnPackageReceived(arg);//数据包收到触发事件 } } } //异步接受数据 public void AsyncReceiveData() { try { client.BeginReceive(new AsyncCallback(ReceiveCallback), null); } catch (SocketException ex) { throw ex; } } //接收数据的回调函数 public void ReceiveCallback(IAsyncResult param) { if (param.IsCompleted) { UdpPacket udpp = null; try { byte[] data = client.EndReceive(param, ref remotIpEnd);//接收数据,当Client端连接主机的时候,test就变成Cilent端的IP了 udpp = (UdpPacket)SerializationUnit.DeserializeObject(data); } catch (Exception ex) { //异常处理操作 } finally { AsyncReceiveData(); } if (udpp != null)//触发数据包收到事件 { PackageEventArgs arg = new PackageEventArgs(udpp, null); OnPackageReceived(arg); } } } /// /// 同步发送分包数据 /// /// public void SendData(Msg message) { ICollection udpPackets = UdpPacketSplitter.Split(message); foreach (UdpPacket udpPacket in udpPackets) { byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket); //使用同步发送 client.Send(udpPacketDatagram, udpPacketDatagram.Length,udpPacket.remoteip); if (udpPacket.IsRequireReceiveCheck) PushSendItemToList(udpPacket);//将该消息压入列表 } } /// /// 异步分包发送数组的方法 /// /// public void AsyncSendData(Msg message) { ICollection udpPackets = UdpPacketSplitter.Split(message); foreach (UdpPacket udpPacket in udpPackets) { byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket); //使用同步发送 //client.Send(udpPacketDatagram, udpPacketDatagram.Length); //使用异步的方法发送数据 this.client.BeginSend(udpPacketDatagram, udpPacketDatagram.Length, new AsyncCallback(SendCallback), null); } } //发送完成后的回调方法 public void SendCallback(IAsyncResult param) { if (param.IsCompleted) { try { client.EndSend(param);//这句话必须得写,BeginSend()和EndSend()是成对出现的 } catch (Exception e) { //其他处理异常的操作 } } } static object lockObj = new object(); /// /// 自由线程,检测未发送的数据并发出,存在其中的就是没有收到确认包的数据包 /// void CheckUnConfirmedQueue() { do { if (sendlist.Count > 0) { UdpPacket[] array = null; lock (sendlist) { array = sendlist.ToArray(); } //挨个重新发送并计数 Array.ForEach(array, s => { s.sendtimes++; if (s.sendtimes >= MaxResendTimes) { //sOnPackageSendFailure//出发发送失败事件 sendlist.Remove(s);//移除该包 } else { //重新发送 byte[] udpPacketDatagram = SerializationUnit.SerializeObject(s); client.Send(udpPacketDatagram, udpPacketDatagram.Length, s.remoteip); } }); } Thread.Sleep(CheckQueueTimeInterval);//间隔一定时间重发数据 } while (true); } /// /// 将数据信息压入列表 /// /// void PushSendItemToList(UdpPacket item) { sendlist.Add(item); } /// /// 将数据包从列表中移除 /// /// 数据包编号 /// 数据包分包索引 public void PopSendItemFromList(long packageNo, int packageIndex) { lock (lockObj) { Array.ForEach(sendlist.Where(s => s.sequence == packageNo && s.index == packageIndex).ToArray(), s => sendlist.Remove(s)); } } /// /// 关闭客户端并释放资源 /// public void Dispose() { if (client != null) { client.Close(); client = null; } } } } 首先是数据信息实体类 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; namespace Model { //封装消息类 [Serializable] public class Msg { //所属用户的用户名 public string name { get; set; } //所属用户的ip public string host { get; set; } //命令的名称 public string command { get; set; } //收信人的姓名 public string desname { get; set; } //你所发送的消息的目的地ip,应该是对应在服务器的列表里的主键值 public string destinationIP { get; set; } //端口号 public int port { get; set; } //文本消息 public string msg { get; set; } //二进制消息 public byte[] byte_msg { get; set; } //附加数据 public string extend_msg { get; set; } //时间戳 public DateTime time { get; set; } //构造函数 public Msg(string command,string desip,string msg,string host) { this.command = command; this.destinationIP = desip; this.msg = msg; this.time = DateTime.Now; this.host = host; } override public string ToString() { return name + "说:" + msg; } } } MSG数据分割后生成分包数据

分包实体类

using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; using System.Net; namespace Model { [Serializable] public class UdpPacket { public long sequence{get;set;}//所属组的唯一序列号 包编号 public int total { get; set; }//分包总数 public int index { get; set; }//消息包的索引 public byte[] data { get; set; }//包的内容数组 public int dataLength { get; set; }//分割的数组包大小 public int remainder { get; set; }//最后剩余的数组的数据长度 public int sendtimes { get; set; }//发送次数 public IPEndPoint remoteip { get; set; }//接受该包的远程地址 public bool IsRequireReceiveCheck { get; set; }//获得或设置包收到时是否需要返回确认包 public static int HeaderSize = 30000; public UdpPacket(long sequence, int total, int index, byte[] data, int dataLength, int remainder,string desip,int port) { this.sequence = sequence; this.total = total; this.index = index; this.data = data; this.dataLength = dataLength; this.remainder = remainder; this.IsRequireReceiveCheck = true;//默认都需要确认包 //构造远程地址 IPAddress ipA = IPAddress.Parse(desip); this.remoteip = new IPEndPoint(ipA, port); } //把这个对象生成byte[] public byte[] ToArray() { return SerializationUnit.SerializeObject(this); } } } 数据包分割工具类 using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; namespace Model { /// /// UDP数据包分割器 /// public static class UdpPacketSplitter { public static ICollection Split(Msg message) { byte[] datagram = null; try { datagram = SerializationUnit.SerializeObject(message); } catch (Exception e) { //AddTalkMessage("数据转型异常"); } //产生一个序列号,用来标识包数据属于哪一组 Random Rd = new Random(); long SequenceNumber = Rd.Next(88888, 999999); ICollection udpPackets = UdpPacketSplitter.Split(SequenceNumber, datagram, 10240, message.destinationIP, message.port); return udpPackets; } /// /// 分割UDP数据包 /// /// UDP数据包所持有的序号 /// 被分割的UDP数据包 /// 分割块的长度 /// /// 分割后的UDP数据包列表 /// public static ICollection Split(long sequence, byte[] datagram, int chunkLength,string desip,int port) { if (datagram == null) throw new ArgumentNullException("datagram"); List packets = new List(); int chunks = datagram.Length / chunkLength; int remainder = datagram.Length % chunkLength; int total = chunks; if (remainder > 0) total++; for (int i = 1; i 0) { int length = datagram.Length - (chunkLength * chunks); byte[] chunk = new byte[length]; Buffer.BlockCopy(datagram, chunkLength * chunks, chunk, 0, length); packets.Add(new UdpPacket(sequence, total, total, chunk, chunkLength, remainder, desip, port)); } return packets; } } } 服务端存储数据的数据结构 using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; using Model; namespace Model { //一个sequence对应一组的数据包的数据结构 public class RecDataList { public long sequence { get; set; }//序列号 //对应的存储包的List List RecudpPackets = new List(); public int total { get; set; } public int dataLength { get; set; } public int remainder { get; set; } public byte[] DataBuffer = null; public RecDataList(UdpPacket udp) { this.sequence = udp.sequence; this.total = udp.total; this.dataLength = udp.dataLength; this.remainder = udp.remainder; if (DataBuffer == null) { DataBuffer = new byte[dataLength * (total - 1) + remainder]; } } public RecDataList(long sequence, int total, int chunkLength, int remainder) { this.sequence = sequence; this.total = total; this.dataLength = chunkLength; this.remainder = remainder; if (DataBuffer == null) { DataBuffer = new byte[this.dataLength * (this.total - 1) + this.remainder]; } } public void addPacket(UdpPacket p) { RecudpPackets.Add(p); } public Msg show() { if (RecudpPackets.Count == total)//表示已经收集满了 { //重组数据 foreach (UdpPacket udpPacket in RecudpPackets) { //偏移量 int offset = (udpPacket.index - 1) * udpPacket.dataLength; Buffer.BlockCopy(udpPacket.data, 0, DataBuffer, offset, udpPacket.data.Length); } Msg rmsg = (Msg)SerializationUnit.DeserializeObject(DataBuffer); DataBuffer = null; RecudpPackets.Clear(); return rmsg; } else { return null; } } public bool containskey(UdpPacket udp) { foreach (UdpPacket udpPacket in RecudpPackets) { if (udpPacket.index == udp.index) return true; } return false; } } } 编码工具类 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.Serialization.Formatters.Binary; using System.IO; namespace Tool { public class EncodingTool { //编码 public static byte[] EncodingASCII(string buf) { byte[] data = Encoding.Unicode.GetBytes(buf); return data; } //解码 public static string DecodingASCII(byte[] bt) { string st = Encoding.Unicode.GetString(bt); return st; } //编码 public static byte[] EncodingUTF_8(string buf) { byte[] data = Encoding.UTF8.GetBytes(buf); return data; } //编码 public static string DecodingUTF_8(byte[] bt) { string st = Encoding.UTF8.GetString(bt); return st; } } } 序列化和反序列化的工具类 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.Serialization.Formatters.Binary; using System.IO; namespace Tool { public class SerializationUnit { /// /// 把对象序列化为字节数组 /// public static byte[] SerializeObject(object obj) { if (obj == null) return null; //内存实例 MemoryStream ms = new MemoryStream(); //创建序列化的实例 BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(ms, obj);//序列化对象,写入ms流中 ms.Position = 0; //byte[] bytes = new byte[ms.Length];//这个有错误 byte[] bytes = ms.GetBuffer(); ms.Read(bytes, 0, bytes.Length); ms.Close(); return bytes; } /// /// 把字节数组反序列化成对象 /// public static object DeserializeObject(byte[] bytes) { object obj = null; if (bytes == null) return obj; //利用传来的byte[]创建一个内存流 MemoryStream ms = new MemoryStream(bytes); ms.Position = 0; BinaryFormatter formatter = new BinaryFormatter(); obj = formatter.Deserialize(ms);//把内存流反序列成对象 ms.Close(); return obj; } /// /// 把字典序列化 /// /// /// public static byte[] SerializeDic(Dictionary dic) { if (dic.Count == 0) return null; MemoryStream ms = new MemoryStream(); BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(ms, dic);//把字典序列化成流 byte[] bytes = new byte[ms.Length];//从流中读出byte[] ms.Read(bytes, 0, bytes.Length); return bytes; } /// /// 反序列化返回字典 /// /// /// public static Dictionary DeserializeDic(byte[] bytes) { Dictionary dic = null; if (bytes == null) return dic; //利用传来的byte[]创建一个内存流 MemoryStream ms = new MemoryStream(bytes); ms.Position = 0; BinaryFormatter formatter = new BinaryFormatter(); //把流中转换为Dictionary dic = (Dictionary)formatter.Deserialize(ms); return dic; } } } 通用的数据包事件类 using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading; using Model; namespace ZZUdp.Core { /// /// 数据包事件数据 /// public class PackageEventArgs : EventArgs { /// /// 网络消息包 /// public UdpPacket udpPackage { get; set; } /// /// 网络消息包组 /// public UdpPacket[] udpPackages { get; set; } /// /// 远程IP /// public IPEndPoint RemoteIP { get; set; } /// /// 是否已经处理 /// public bool IsHandled { get; set; } /// /// 创建一个新的 PackageEventArgs 对象. /// public PackageEventArgs(UdpPacket package, IPEndPoint RemoteIP) { this.udpPackage = package; this.RemoteIP = RemoteIP; this.IsHandled = false; } } }



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3